package com.huan.table

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.Table
import org.apache.flink.table.api.scala._

//定义样例类，温度传感器
case class SensorReading( id: String, timestamp: Long, temperature: Double )

object Example {
  def main(args: Array[String]): Unit = {
    //开启流环境并且设置全局并行度(并行计算)
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //存放路径，并且读入这个路径
    val path = " E:\\Project\\FlinkTutorials\\Flink-Scala\\src\\main\\resources\\sensor.txt"
    val inputStream = env.readTextFile(path)
    //将路径用逗号切分并且返回给样例类
    val dataStream:DataStream[SensorReading] = inputStream.map(x => {
      val arr: Array[String] = x.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    })

    //创建表的执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    //基于流创建一张表
    val dataTable:Table = tableEnv.fromDataStream(dataStream)

    //使用TableAPI实现
    val resultTable = dataTable.select("id,temperature")
      .filter("id == 'sensor_1'") //这个过滤的意思就是显示在控制台上

    //将类型显示出来并打印
    resultTable.toAppendStream[(String,Double)].print("TableAPI").setParallelism(2)

    println("============================")

    //直接用sql实现
    //创建临时表视图
    tableEnv.createTemporaryView("dataTables",dataTable)
    //SQL语句
    val sql = "select id,temperature from dataTables where id = 'sensor_10'"

    val resultSQL = tableEnv.sqlQuery(sql)
    resultSQL.toAppendStream[(String,Double)].print("SQL").setParallelism(1)

    //提交
    env.execute("Example")


  }
}
